package com.ssm.kafka.test;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

public class SimpleKafkaProducer implements Runnable {
    protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaProducer.class);

    @Override
    public void run() {
        Map<String, Object> sendProps = senderProps();
        Producer producer = new KafkaProducer(sendProps);
        Integer currentNum = 0;
        try {
            LOGGER.info("start produce message");
            while (true){
                ProducerRecord<Integer, String> producerRecord = new ProducerRecord<Integer, String>("ssm_java",currentNum, ""+currentNum);
                producer.send(producerRecord);
                LOGGER.info("send message:" + currentNum + " And value is " + producerRecord.value());
                currentNum++;
                Thread.sleep(1000);
            }
        }catch (Exception e){
            LOGGER.error("send message fail", e);
        }finally {
            producer.close();
        }
    }
    public static void main(String[] args) {
        SimpleKafkaProducer simpleKafkaProducer = new SimpleKafkaProducer();
        new Thread(simpleKafkaProducer).start();
    }


    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.128:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}
